package org.qy.hbase.server.service.impl;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;
import org.qy.hbase.server.api.HbaseTemplate;
import org.qy.hbase.server.api.RowMapper;
import org.qy.hbase.server.rowmapper.BaseRowMapper;
import org.qy.hbase.server.service.HbaseService;
import org.qy.hbase.server.utils.ColumnUtils;
import org.qy.hbase.server.utils.MutationUtils;
import org.qy.hbase.server.utils.RowMapperUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;

@Service
public class HbaseServiceImpl implements HbaseService {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private HbaseTemplate hbaseTemplate;

    /**
     * 条件查询对应表属性
     *
     * @param scan
     * @param rowMapper
     * @param <T>
     * @return
     */
    @Override
    public <T> List<T> find(Scan scan, RowMapper<T> rowMapper) {
        try {
            if (rowMapper instanceof BaseRowMapper) {
                BaseRowMapper baseRowMapper = (BaseRowMapper)rowMapper;
                final String entityName = baseRowMapper.getCls().getSimpleName();
                final String tableName = ColumnUtils.camelToUnderline(entityName);
                return find(tableName, scan, rowMapper);
            }
        }
        catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return new ArrayList<>();
    }

    /**
     * 根据条件查询表属性
     *
     * @param tableName
     * @param scan
     * @param rowMapper
     * @param <T>
     * @return
     */
    @Override public <T> List<T> find(String tableName, Scan scan, RowMapper<T> rowMapper) {
        try {
            return hbaseTemplate.find(tableName, scan, rowMapper);
        }
        catch (Exception e) {
            return new ArrayList<>();
        }
    }

    /**
     * rowkey查询
     *
     * @param rowKey
     * @param rowMapper
     * @param <T>
     * @return
     */
    @Override public <T> T get(String rowKey, RowMapper<T> rowMapper) {
        try {
            BaseRowMapper baseRowMapper = (BaseRowMapper)rowMapper;
            final String entityName = baseRowMapper.getCls().getSimpleName();
            final String tableName = ColumnUtils.camelToUnderline(entityName);
            return get(tableName, rowKey, rowMapper);
        }
        catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return null;
    }

    /**
     * rowKey查询
     *
     * @param tableName
     * @param rowKey
     * @param rowMapper
     * @param <T>
     * @return
     */
    @Override public <T> T get(String tableName, String rowKey, RowMapper<T> rowMapper) {
        try {
            return hbaseTemplate.get(tableName, rowKey, rowMapper);
        }
        catch (Exception e) {
            return null;
        }
    }

    /**
     * 根据条件查询
     *
     * @param tableName
     * @param scan
     * @param paramMap
     * @param rowMapper
     * @return
     */
    @Override public <T> List<T> getListByParam(String tableName, Scan scan, RowMapper<T> rowMapper, HashMap<String, String> paramMap) {
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        if (Objects.nonNull(paramMap) && paramMap.size() > 0) {
            for (String key : paramMap.keySet()) {
                SingleColumnValueFilter single = new SingleColumnValueFilter(Bytes.toBytes(RowMapperUtils.DEFAULT_FAMILY),
                        Bytes.toBytes(key), CompareOperator.EQUAL, new SubstringComparator(paramMap.get(key)));
                filterList.addFilter(single);
            }
        }
        scan.setFilter(filterList);
        if (Objects.nonNull(paramMap) && paramMap.containsKey("pageSize")) {
            scan.setLimit(Integer.valueOf(paramMap.get("pageSize")));
        }else {
            scan.setLimit(1);
        }
        return this.find(tableName, scan, rowMapper);
    }

    /**
     * 更新或插入数据
     *
     * @param t
     * @param <T>
     * @return
     */
    @Override public <T> int upsert(T t) {
        final String entityName = t.getClass().getSimpleName();
        final String tableName = ColumnUtils.camelToUnderline(entityName);
        return upsert(tableName, t);
    }

    /**
     * 更新或插入数据
     *
     * @param tableName
     * @param t
     * @param <T>
     * @return
     */
    @Override public <T> int upsert(String tableName, T t) {
        final Put p = MutationUtils.generate(t);
        if (p != null) {
            hbaseTemplate.saveOrUpdate(tableName, p);
            return 1;
        }
        return 0;
    }

    /**
     * 批量更新或插入数据
     *
     * @param tableName
     * @param dataList
     * @param <T>
     * @return
     */
    @Override public <T> int upsert(String tableName, List<T> dataList) {
        try {
            final List<Mutation> pList = MutationUtils.generate(dataList);
            if (CollectionUtils.isNotEmpty(pList)) {
                hbaseTemplate.saveOrUpdates(tableName, pList);
                return pList.size();
            }
        }
        catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return 0;
    }

    /**
     * 批量更新或插入数据
     *
     * @param dataList
     * @param <T>
     * @return
     */
    @Override public <T> int upsert(List<T> dataList) {
        if (CollectionUtils.isEmpty(dataList)) {
            return 0;
        }
        final String entityName = dataList.get(0).getClass().getSimpleName();
        final String tableName = ColumnUtils.camelToUnderline(entityName);
        return upsert(tableName, dataList);
    }

    @Override
    public <T> int upsert(List<T> dataList, int flushSize){
        if(CollectionUtils.isEmpty(dataList)){
            return 0;
        }
        final String entityName = dataList.get(0).getClass().getSimpleName();
        final String tableName = ColumnUtils.camelToUnderline(entityName);
        return upsert(tableName,dataList, flushSize);
    }

    @Override
    public <T> int upsert(String tableName, List<T> dataList, int flushSize){
        if(CollectionUtils.isEmpty(dataList)){
            return 0;
        }
        int result = 0;
        int count = dataList.size() % flushSize == 0 ? dataList.size() / flushSize : dataList.size() / flushSize + 1;
        for (int i = 0; i < count; i++) {
            int end = (i + 1) * flushSize;
            end = end > dataList.size() ? dataList.size() : end;
            List<T> subList = dataList.subList(i * flushSize, end);
            result += upsert(tableName, subList);
        }
        return result;
    }

    /**
     * 创建表
     *
     * @param tableName 表名
     * @param splits    分区
     */
    @Override public void createTable(String tableName, byte[][] splits)
            throws Exception
    {
        Admin admin = null;
        try {
            admin = hbaseTemplate.getConnection().getAdmin();
            final ColumnFamilyDescriptor columnFamilyDescriptor =
                    ColumnFamilyDescriptorBuilder.newBuilder(RowMapperUtils.DEFAULT_FAMILY.getBytes()).
                            setCompressionType(Compression.Algorithm.SNAPPY).build();
            final TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).
                    setColumnFamily(columnFamilyDescriptor).build();
            if (admin.tableExists(TableName.valueOf(tableName))) {
                logger.info("create table done, table exists! the tableName is: " + tableName);
            }
            else if (splits != null) {
                admin.createTable(tableDescriptor, splits);
            }
            else {
                admin.createTable(tableDescriptor);
            }
        }
        catch (Exception e) {
            logger.error("create table error: " + e.getMessage(), e);
            throw new Exception(e);
        }
        finally {
            if (admin != null) {
                try {
                    admin.close();
                }
                catch (IOException e) {
                    logger.error("close admin error: " + e.getMessage(), e);
                }
            }
        }
    }

    /**
     * 创建表
     *
     * @param tableName
     */
    @Override public void createTable(String tableName)
            throws Exception
    {
        createTable(tableName, null);
    }

}
